Go 并发编程-8.Sync.Map

普通 Map 的使用和注意事项

Go 内建的 map 类型为 map[K]V,其中,key 类型的 K 必须是可比较的(comparable),也就是可以通过 == 和 != 操作符进行比较;value 的值和类型无所谓,可以是任意的类型,或者为 nil。

在 Go 语言中,bool、整数、浮点数、复数、字符串、指针、Channel、接口都是可比较的,包含可比较元素的 struct 和数组,这俩也是可比较的,而 slice、map、函数值都是不可比较的。

注意事项一

如果使用 struct 类型做 key 其实是有坑的,因为如果 struct 的某个字段值修改了,查询 map 时无法获取它 add 进去的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

type mapKey struct {
key int
}

func main() {
// 声明一个 key 类型为 struct 的 map
var m = make(map[mapKey]string)
var key = mapKey{10}


m[key] = "hello"
fmt.Printf("m[key]=%s\n", m[key])


// 修改key的字段的值后再次查询map,无法获取刚才add进去的值
key.key = 100
fmt.Printf("再次查询m[key]=%s\n", m[key])
}

注意事项二

在 Go 中,map[key]函数返回结果可以是一个值,也可以是两个值。如果获取一个不存在的 key 对应的值时,会返回零值。为了区分真正的零值和 key 不存在这两种情况,可以根据第二个返回值来区分。

1
2
3
4
5
6
7
8
9
10

func main() {
var m = make(map[string]int)
m["a"] = 0
fmt.Printf("a=%d; b=%d\n", m["a"], m["b"]) // 无法区分出是key不存在返回的零值,还是设置的值。

av, aexisted := m["a"]
bv, bexisted := m["b"]
fmt.Printf("a=%d, existed: %t; b=%d, existed: %t\n", av, aexisted, bv, bexisted)
}

注意事项三

map 是无序的,所以当遍历一个 map 对象的时候,迭代的元素的顺序是不确定的,无法保证两次遍历的顺序是一样的,也不能保证和插入的顺序一致。那怎么办呢?如果我们想要按照 key 的顺序获取 map 的值,需要先取出所有的 key 进行排序,然后按照这个排序的 key 依次获取对应的值。而如果我们想要保证元素有序,比如按照元素插入的顺序进行遍历,可以使用辅助的数据结构,比如 orderedmap,来记录插入顺序。

1
2
3
4
5
6
7
8
9
10
11
import "github.com/elliotchance/orderedmap/v2"

func main() {
m := orderedmap.NewOrderedMap[string, any]()

m.Set("foo", "bar")
m.Set("qux", 1.23)
m.Set("123", true)

m.Delete("qux")
}

注意事项四

和 slice 或者 Mutex、RWmutex 等 struct 类型不同,map 对象必须在使用之前初始化。如果不初始化就直接赋值的话,会出现 panic 异常,比如下面的例子,m 实例还没有初始化就直接进行操作会导致 panic

1
2
3
4
func main() {
var m map[int]int
m[100] = 100
}

并且从一个 nil 的 map 对象中获取值不会 panic,而是会得到零值,所以下面的代码不会报错:

1
2
3
4
func main() {
var m map[int]int
fmt.Println(m[100]) // 从一个 nil 的 map 获取值不会 panic
}

特别要注意,map 作为一个 struct 字段的时候,很容易忘记初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
type Counter struct {
Website string
Start time.Time
PageCounters map[string]int
}

func main() {
var c Counter
c.Website = "baidu.com"


c.PageCounters["/"]++ // 未初始化
}

注意事项五

Go 内建的 map 对象不是线程(goroutine)安全的,并发读写的时候运行时会有检查,遇到并发问题就会导致 panic。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func main() {
var m = make(map[int]int,10) // 初始化一个map
go func() {
for {
m[1] = 1 //设置key
}
}()

go func() {
for {
_ = m[2] //访问这个map
}
}()
select {}
}

虽然这段代码看起来是读写 goroutine 各自操作不同的元素,貌似 map 也没有扩容的问题,但是运行时检测到同时对 map 对象有并发访问,就会直接 panic。

如何实现线程安全的 Map

避免 map 并发读写 panic 的方式之一就是加锁,考虑到读写性能,可以使用读写锁提供性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

type RWMap struct { // 一个读写锁保护的线程安全的map
sync.RWMutex // 读写锁保护下面的map字段
m map[int]int
}
// 新建一个RWMap
func NewRWMap(n int) *RWMap {
return &RWMap{
m: make(map[int]int, n),
}
}
func (m *RWMap) Get(k int) (int, bool) { //从map中读取一个值
m.RLock()
defer m.RUnlock()
v, existed := m.m[k] // 在锁的保护下从map中读取
return v, existed
}

func (m *RWMap) Set(k int, v int) { // 设置一个键值对
m.Lock() // 锁保护
defer m.Unlock()
m.m[k] = v
}

func (m *RWMap) Delete(k int) { //删除一个键
m.Lock() // 锁保护
defer m.Unlock()
delete(m.m, k)
}

func (m *RWMap) Len() int { // map的长度
m.RLock() // 锁保护
defer m.RUnlock()
return len(m.m)
}

func (m *RWMap) Each(f func(k, v int) bool) { // 遍历map
m.RLock() //遍历期间一直持有读锁
defer m.RUnlock()

for k, v := range m.m {
if !f(k, v) {
return
}
}
}

更高效的并发 Map, 分片加锁

虽然使用读写锁可以提供线程安全的 map,但是在大量并发读写的情况下,锁的竞争会非常激烈。

减少锁的粒度常用的方法就是分片(Shard),将一把锁分成几把锁,每个锁控制一个分片。Go 比较知名的分片并发 map 的实现是orcaman/concurrent-map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
 var SHARD_COUNT = 32

// 分成SHARD_COUNT个分片的map
type ConcurrentMap []*ConcurrentMapShared

// 通过RWMutex保护的线程安全的分片,包含一个map
type ConcurrentMapShared struct {
items map[string]interface{}
sync.RWMutex // Read Write mutex, guards access to internal map.
}

// 创建并发map
func New() ConcurrentMap {
m := make(ConcurrentMap, SHARD_COUNT)
for i := 0; i < SHARD_COUNT; i++ {
m[i] = &ConcurrentMapShared{items: make(map[string]interface{})}
}
return m
}


// 根据key计算分片索引
func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared {
return m[uint(fnv32(key))%uint(SHARD_COUNT)]
}

// set 方法
func (m ConcurrentMap) Set(key string, value interface{}) {
// 根据key计算出对应的分片
shard := m.GetShard(key)
shard.Lock() //对这个分片加锁,执行业务操作
shard.items[key] = value
shard.Unlock()
}

// get 方法
func (m ConcurrentMap) Get(key string) (interface{}, bool) {
// 根据key计算出对应的分片
shard := m.GetShard(key)
shard.RLock()
// 从这个分片读取key的值
val, ok := shard.items[key]
shard.RUnlock()
return val, ok
}

加锁和分片加锁这两种方案都比较常用,如果是追求更高的性能,显然是分片加锁更好,因为它可以降低锁的粒度,进而提高访问此 map 对象的吞吐。如果并发性能要求不是那么高的场景,简单加锁方式更简单。

官方提供的 sync.Map

这个 sync.Map 并不是用来替换内建的 map 类型的,它只能被应用在一些特殊的场景里。

  1. 只会增长的缓存系统中,一个 key 只写入一次而被读很多次;
  2. 多个 goroutine 为不相交的键集读、写和重写键值对。

sync.Map 的实现原理如下:

  • 空间换时间。通过冗余的两个数据结构(只读的 read 字段、可写的 dirty),来减少加锁对性能的影响。对只读字段(read)的操作不需要加锁。
  • 优先从 read 字段读取、更新、删除,因为对 read 字段的读取不需要锁。
  • 动态调整。miss 次数多了之后,将 dirty 数据提升为 read,避免总是从 dirty 中加锁读取。
  • double-checking。加锁之后先还要再检查 read 字段,确定真的不存在才操作 dirty 字段。
  • 延迟删除。删除一个键值只是打标记,只有在提升 dirty 字段为 read 字段的时候才清理删除的数据。

    Map 的数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

type Map struct {
mu Mutex
// 基本上你可以把它看成一个安全的只读的map
// 它包含的元素其实也是通过原子操作更新的,但是已删除的entry就需要加锁操作了
read atomic.Value // readOnly

// 包含需要加锁才能访问的元素
// 包括所有在read字段中但未被expunged(删除)的元素以及新加的元素
dirty map[interface{}]*entry // dirty 指将最新写入的数据则存在 dirty 字段上。

// 记录从read中读取miss的次数,一旦miss数和dirty长度一样了,就会把dirty提升为read,并把dirty置空
misses int // misses 字段用来统计 read 被穿透的次数(被穿透指需要读 dirty 的情况)
}

type readOnly struct {
m map[interface{}]*entry
amended bool // 当dirty中包含read没有的数据时为true,比如新增一条数据
}

// expunged是用来标识此项已经删掉的指针
// 当map中的一个项目被删除了,只是把它的值标记为expunged,以后才有机会真正删除此项
var expunged = unsafe.Pointer(new(interface{}))

// entry代表一个值
type entry struct {
p unsafe.Pointer // *interface{}
}

Store

Store 可以用来设置一个键值对,或者更新一个键值对。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

func (m *Map) Store(key, value interface{}) {
read, _ := m.read.Load().(readOnly)
// 如果read字段包含这个项,说明是更新,cas更新项目的值即可
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}

// read中不存在,或者cas更新失败,就需要加锁访问dirty了
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok { // 双检查,看看read是否已经存在了
if e.unexpungeLocked() {
// 此项目先前已经被删除了,通过将它的值设置为nil,标记为unexpunged
m.dirty[key] = e
}
e.storeLocked(&value) // 更新
} else if e, ok := m.dirty[key]; ok { // 如果dirty中有此项
e.storeLocked(&value) // 直接更新
} else { // 否则就是一个新的key
if !read.amended { //如果dirty为nil
// 需要创建dirty对象,并且标记read的amended为true,
// 说明有元素它不包含而dirty包含
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value) //将新值增加到dirty对象中
}
m.mu.Unlock()
}

可以看出,Store 既可以是新增元素,也可以是更新元素。如果运气好的话,更新的是已存在的未被删除的元素,直接更新即可,不会用到锁。如果运气不好,需要更新(重用)删除的对象、更新还未提升的 dirty 中的对象,或者新增加元素的时候就会使用到了锁,这个时候,性能就会下降。

所以从这一点来看,sync.Map 适合那些只会增长的缓存系统,可以进行更新,但是不要删除,并且不要频繁地增加新元素。

新加的元素需要放入到 dirty 中,如果 dirty 为 nil,那么需要从 read 字段中复制出来一个 dirty 对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14

func (m *Map) dirtyLocked() {
if m.dirty != nil { // 如果dirty字段已经存在,不需要创建了
return
}

read, _ := m.read.Load().(readOnly) // 获取read字段
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m { // 遍历read字段
if !e.tryExpungeLocked() { // 把非punged的键值对复制到dirty中
m.dirty[k] = e
}
}
}

Load

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
// 首先从read处理
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended { // 如果不存在并且dirty不为nil(有新的元素)
m.mu.Lock()
// 双检查,看看read中现在是否存在此key
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {//依然不存在,并且dirty不为nil
e, ok = m.dirty[key]// 从dirty中读取
// 不管dirty中存不存在,miss数都加1
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load() //返回读取的对象,e既可能是从read中获得的,也可能是从dirty中获得的
}

如果幸运的话,我们从 read 中读取到了这个 key 对应的值,那么就不需要加锁了,性能会非常好。但是,如果请求的 key 不存在或者是新加的,就需要加锁从 dirty 中读取。所以,读取不存在的 key 会因为加锁而导致性能下降,读取还没有提升的新值的情况下也会因为加锁性能下降。

其中,missLocked 增加 miss 的时候,如果 miss 数等于 dirty 长度,会将 dirty 提升为 read,并将 dirty 置空。

1
2
3
4
5
6
7
8
9
func (m *Map) missLocked() {
m.misses++ // misses计数加一
if m.misses < len(m.dirty) { // 如果没达到阈值(dirty字段的长度),返回
return
}
m.read.Store(readOnly{m: m.dirty}) //把dirty字段的内存提升为read字段
m.dirty = nil // 清空dirty
m.misses = 0 // misses数重置为0
}

Delete

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
// 双检查
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
// 这一行长坤在1.15中实现的时候忘记加上了,导致在特殊的场景下有些key总是没有被回收
delete(m.dirty, key)
// miss数加1
m.missLocked()
}
m.mu.Unlock()
}
if ok {
return e.delete()
}
return nil, false
}

func (m *Map) Delete(key interface{}) {
m.LoadAndDelete(key)
}
func (e *entry) delete() (value interface{}, ok bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return *(*interface{})(p), true
}
}
}

如果 read 中不存在,那么就需要从 dirty 中寻找这个项目。最终,如果项目存在就删除(将它的值标记为 nil)。如果项目不为 nil 或者没有被标记为 expunged,那么还可以把它的值返回。

总结